package docker_rocket

import (
	"bytes"
	"context"
	"fmt"
	"github.com/apache/rocketmq-client-go/v2"
	"github.com/apache/rocketmq-client-go/v2/consumer"
	"github.com/apache/rocketmq-client-go/v2/primitive"
	jsoniter "github.com/json-iterator/go"
	"github.com/pkg/errors"
	uuid "github.com/satori/go.uuid"
	"github.com/spf13/viper"
	"io/ioutil"
	"net/http"
	"strconv"
	"strings"
	"sync"
	"sync/atomic"
	"time"
)

const (
	QUEUENAME   = "QueueName"
	QUEUETOPIC  = "QueueTopic"
	QUEUECONFIG = "ConstConfig"
	APPMODE     = "APPMODE" // cmd / docker
	APPUUID     = "AppUUID"
	APPSTATUS   = "AppStatus" // 应用状态消息 waiting performed
	APPMSG      = "AppMsg"    // 应用错误消息
	TASKID      = "TaskId"
	COUNTKEY    = "CountKey"

	WAITING   = "waiting"
	PERFORMED = "performed"
	GOTERR    = "gotErr"
	SHUTDOWN  = "shutdown"

	PRAGRAM = "ck-api"
)

func init() {
	QueueRun.Store(APPUUID, uuid.NewV4().String())
	QueueRun.Store(APPSTATUS, WAITING)
}

var queueLists = sync.Map{}
var QueueRun = sync.Map{} // 运行中用到参数变量
var doneCount int64

func Register(cmdName string, processorInterface ProcessorInterface) {
	queueLists.Store(cmdName, processorInterface)
}

type QueueConfig struct {
	Type          string
	Host          string
	PanicHandler  func(err error)
	ErrorHandler  func(err error)
	DefaultConfig map[string]string
	ConfigMap     map[string]string
}

func NormalRun(cmdName string) error {
	// 冒号用来区分tag，或者其他
	if strings.Contains(cmdName, ":") {
		cmdQueues := strings.Split(cmdName, ":")
		cmdName = cmdQueues[0]
	}
	atomic.StoreInt64(&doneCount, 0)
	var consumerP ProcessorInterface
	if v, ok := queueLists.Load(cmdName); ok {
		consumerP = v.(ProcessorInterface)
	} else {
		return errors.New("queue fun not exists")
	}
	// 保存当前执行的 cmdName
	QueueRun.Store(QUEUENAME, cmdName)

	err := consumerP.Init()
	if err != nil {
		return err
	}
	cfgI, ok := QueueRun.Load(QUEUECONFIG)
	if !ok {
		return errors.New("queue config not exists")
	}
	cfg := cfgI.(*QueueConfig)
	hosts := strings.Split(cfg.Host, ";")

	opts := make([]consumer.Option, 0)
	opts = append(opts, consumer.WithNsResolver(primitive.NewPassthroughResolver(hosts)), consumer.WithAutoCommit(true))
	if CountKey, ok := cfg.ConfigMap["CountKey"]; ok {
		QueueRun.Store(COUNTKEY, CountKey)
	} else {
		QueueRun.Store(COUNTKEY, consumerP.QueueName())
	}

	if Model, ok := cfg.ConfigMap["Model"]; ok {
		if Model == "BroadCasting" {
			opts = append(opts, consumer.WithConsumerModel(consumer.BroadCasting))
		} else {
			opts = append(opts, consumer.WithConsumerModel(consumer.Clustering))
		}
	} else {
		opts = append(opts, consumer.WithConsumerModel(consumer.Clustering))
	}

	orderly := false
	if ConsumerOrder, ok := cfg.ConfigMap["ConsumerOrder"]; ok {
		if ConsumerOrder == "true" {
			orderly = true
			opts = append(opts, consumer.WithConsumerOrder(true))
		}
	}

	if Namespace, ok := cfg.ConfigMap["Namespace"]; ok {
		opts = append(opts, consumer.WithNamespace(Namespace))
	}

	if FromWhere, ok := cfg.ConfigMap["FromWhere"]; ok {
		if FromWhere == "ConsumeFromLastOffset" {
			opts = append(opts, consumer.WithConsumeFromWhere(consumer.ConsumeFromLastOffset))
		}
		if FromWhere == "ConsumeFromFirstOffset" {
			opts = append(opts, consumer.WithConsumeFromWhere(consumer.ConsumeFromFirstOffset))
		}
		if FromWhere == "ConsumeFromTimestamp" {
			opts = append(opts, consumer.WithConsumeFromWhere(consumer.ConsumeFromTimestamp))
		}
	}

	if Strategy, ok := cfg.ConfigMap["Strategy"]; ok {
		if Strategy == "All" {
			opts = append(opts, consumer.WithStrategy(func(consumerGroup, currentCID string, mqAll []*primitive.MessageQueue, cidAll []string) []*primitive.MessageQueue {
				return mqAll
			}))
		}
	}
	if maxReconsumeTimes, ok := cfg.ConfigMap["MaxReconsumeTimes"]; ok {
		maxReconsumeTimesInt, _ := strconv.Atoi(maxReconsumeTimes)
		opts = append(opts, consumer.WithMaxReconsumeTimes(int32(maxReconsumeTimesInt)))
		// opts = append(opts, consumer.WithSuspendCurrentQueueTimeMillis(2 * time.Second)) // sdk bug,暂时是没什么卵用
	}
	if retry, ok := cfg.ConfigMap["Retry"]; ok {
		retryInt, _ := strconv.Atoi(retry)
		opts = append(opts, consumer.WithRetry(retryInt))
	}
	if size, ok := cfg.ConfigMap["ConsumeMessageBatchMaxSize"]; ok {
		sizeInt, _ := strconv.Atoi(size)
		opts = append(opts, consumer.WithConsumeMessageBatchMaxSize(sizeInt))
	}
	if groupName, ok := cfg.ConfigMap["GroupName"]; ok {
		opts = append(opts, consumer.WithGroupName(groupName))
	} else {
		return errors.New("empty group name")
	}
	if topic, ok := cfg.ConfigMap["Topic"]; ok {
		consumerP.SetQueueName(topic)
	}

	selector := consumer.MessageSelector{}
	if tagExp, ok := cfg.ConfigMap["Tag"]; ok {
		selector.Type = consumer.TAG
		selector.Expression = tagExp
	}

	rocket, err := rocketmq.NewPushConsumer(opts...)
	if err != nil {
		return err
	}
	if orderly {
		// 顺序模式
		err = rocket.Subscribe(consumerP.QueueName(), selector, func(ctx context.Context,
			msgs ...*primitive.MessageExt) (consumer.ConsumeResult, error) {
			for i := range msgs {
				err := process(consumerP, msgs[i])
				if err != nil {
					return consumer.SuspendCurrentQueueAMoment, err
					// fmt.Println(err)
				}
			}
			defer atomic.AddInt64(&doneCount, 1)
			if v, ok := QueueRun.Load(COUNTKEY); ok {
				qn := v.(string)
				if !strings.HasSuffix(qn, ";false") {
					DecrRemain(qn)
				}
			}
			return consumer.ConsumeSuccess, nil
		})
		if err != nil {
			return err
		}
	} else {
		// 并行模式
		err = rocket.Subscribe(consumerP.QueueName(), selector, func(ctx context.Context,
			msgs ...*primitive.MessageExt) (consumer.ConsumeResult, error) {
			for i := range msgs {
				err := process(consumerP, msgs[i])
				if err != nil {
					return consumer.ConsumeRetryLater, nil
				}
			}
			defer atomic.AddInt64(&doneCount, 1)
			if v, ok := QueueRun.Load(COUNTKEY); ok {
				qn := v.(string)
				if !strings.HasSuffix(qn, ";false") {
					DecrRemain(qn)
				}
			}
			return consumer.ConsumeSuccess, nil
		})
		if err != nil {
			return err
		}
	}

	err = rocket.Start()
	if err != nil {
		return err
	}
	QueueRun.Store(QUEUETOPIC, consumerP.QueueName())
	QueueRun.Store(consumerP.QueueName(), rocket)
	return nil
}

func process(consumerP ProcessorInterface, msg *primitive.MessageExt) error {
	defer func() {
		if info := recover(); info != nil {
			err := errors.New("")
			switch x := info.(type) {
			case string:
				err = errors.New(x)
			case error:
				err = x
			}
			cfgI, ok := QueueRun.Load("ConstConfig")
			if !ok {
				return
			}
			cfg := cfgI.(*QueueConfig)

			if cfg.PanicHandler != nil {
				cfg.PanicHandler(err)
			}
		}
	}()
	err := consumerP.Consumer(msg)
	if err != nil {
		return err
	}
	return nil
}

func StopRunning(shutdown bool) error {
	var cmdName string
	if v, ok := QueueRun.Load(QUEUENAME); ok {
		cmdName = v.(string)
	}
	var queueTopic string
	if v, ok := QueueRun.Load(QUEUETOPIC); ok {
		queueTopic = v.(string)
	} else {
		return errors.New("[stop] queue topic not exists")
	}
	var consumerP ProcessorInterface
	if v, ok := queueLists.Load(cmdName); ok {
		consumerP = v.(ProcessorInterface)
	} else {
		return errors.New("[stop] fun not exists")
	}

	var rocketConsumer rocketmq.PushConsumer
	if v, ok := QueueRun.Load(queueTopic); ok {
		rocketConsumer = v.(rocketmq.PushConsumer)
	} else {
		return errors.New("[stop] rocket not exists")
	}
	fmt.Println(rocketConsumer.Unsubscribe(consumerP.QueueName()))
	time.Sleep(3 * time.Second)
	fmt.Println(rocketConsumer.Shutdown())
	if shutdown {
		QueueRun.Store(APPSTATUS, SHUTDOWN)
		if v, ok := QueueRun.Load(APPMODE); ok {
			if v == "docker" {
				HeartbeatRunning()
			}
		}
	} else {
		QueueRun.Store(APPSTATUS, WAITING)
	}
	// QueueRun.Delete(QUEUENAME)
	QueueRun.Delete(TASKID)
	QueueRun.Delete(QUEUETOPIC)
	return nil
}

func HeartbeatRunning() {
	// 获取当前信息
	info := reqStruct{
		ProName: PRAGRAM,
	}
	if v, ok := QueueRun.Load(APPUUID); ok {
		info.Uuid = v.(string)
	}
	if v, ok := QueueRun.Load(APPSTATUS); ok {
		info.Status = v.(string)
	}
	if v, ok := QueueRun.Load(APPMSG); ok {
		info.Msg = v.(string)
	}
	if v, ok := QueueRun.Load(TASKID); ok {
		info.TaskId = v.(int)
	}
	if v, ok := QueueRun.Load(QUEUETOPIC); ok {
		info.Topic = v.(string)
	}
	cfgI, ok := QueueRun.Load(QUEUECONFIG)
	if !ok {
		return
	}
	cfg := cfgI.(*QueueConfig)

	if v, ok := QueueRun.Load(COUNTKEY); ok {
		info.Remain = GetRemain(v.(string))
	}
	info.Done = atomic.LoadInt64(&doneCount)
	resp, err := httpPostHeartbeat(&info)
	if err != nil {
		if cfg != nil && cfg.ErrorHandler != nil {
			cfg.ErrorHandler(err)
		}
		return
	}
	if resp.Status == "wait" {
		return
	} else if resp.Status == "stop" {
		fmt.Println(StopRunning(false))
		return
	} else if resp.Status == "welldone" {
		/*
			if info.Status == PERFORMED && resp.TaskId != info.TaskId {
				// 调整换任务了
				fmt.Println(StopRunning(false))
				time.Sleep(3 * time.Second)
				fmt.Println(heartbeatRun(resp))
				return
			}

		*/
		if info.Status == WAITING {
			fmt.Println(heartbeatRun(resp))
			return
		}
	}
}

func heartbeatRun(resp *respStruct) error {
	cfgI, ok := QueueRun.Load(QUEUECONFIG)
	if !ok {
		return errors.New("queue config not exists")
	}

	cfg := cfgI.(*QueueConfig)
	cfg.ConfigMap = map[string]string{}
	if cfg.DefaultConfig != nil {
		for k, v := range cfg.DefaultConfig {
			cfg.ConfigMap[k] = v
		}
		QueueRun.Store(QUEUECONFIG, cfg)
	}
	if resp.Config != nil {
		for k, v := range resp.Config {
			cfg.ConfigMap[k] = v
		}
		QueueRun.Store(QUEUECONFIG, cfg)
	}
	time.Sleep(1 * time.Second)
	err := NormalRun(resp.CmdQueue)
	if err != nil {
		QueueRun.Store(APPSTATUS, GOTERR)
		QueueRun.Store(APPMSG, err.Error())
	} else {
		QueueRun.Store(APPSTATUS, PERFORMED)
		QueueRun.Store(TASKID, resp.TaskId)
		QueueRun.Store(APPMSG, "")
	}
	return nil
}

type respStruct struct {
	Uuid     string            `json:"uuid"`
	Status   string            `json:"status"` // wait 继续等待 stop 停止现在 welldone 开始做吧
	TaskId   int               `json:"task_id,omitempty"`
	CmdQueue string            `json:"cmd_queue,omitempty"`
	Config   map[string]string `json:"config,omitempty"`
}

type reqStruct struct {
	ProName string `json:"pro_name"`
	Uuid    string `json:"uuid"`
	Status  string `json:"status"`
	TaskId  int    `json:"task_id,omitempty"`
	Topic   string `json:"topic,omitempty"`
	Done    int64  `json:"done,omitempty"`
	Msg     string `json:"msg,omitempty"`
	Remain  int    `json:"remain"`
}

type respNormal struct {
	Code      int         `json:"code"`
	Data      *respStruct `json:"data"`
	Msg       string      `json:"msg"`
	RequestId string      `json:"requestId"`
}

func httpPostHeartbeat(info *reqStruct) (*respStruct, error) {
	// hUrl := "http://localhost:8077/api/v3/opsQueue/heartbeat"
	hUrl := viper.GetString("backend.apiHost") + "/api/v3/opsQueue/heartbeat"
	t, _ := jsoniter.Marshal(info)
	// fmt.Println("httpPostHeartbeat:", string(t))
	req, err := http.NewRequest("POST", hUrl, bytes.NewBuffer(t))
	if err != nil {
		return nil, err
	}
	req.Header.Set("Content-Type", "application/json")
	reqCtx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
	defer cancel()
	req.WithContext(reqCtx)
	resp, err := http.DefaultClient.Do(req)
	if err != nil {
		return nil, err
	}
	defer resp.Body.Close()
	body, err := ioutil.ReadAll(resp.Body)
	if err != nil {
		return nil, err
	}
	respS := &respNormal{}
	jsoniter.Unmarshal(body, respS)
	if respS.Code != 200 {
		return nil, errors.New(respS.Msg)
	}
	return respS.Data, nil
}

type OpsQueueOpen struct {
	ProName  string            `json:"pro_name"`
	CmdQueue string            `json:"cmd_queue"`
	Config   map[string]string `json:"config"`
}

func HttpPostOpen(info *OpsQueueOpen) error {
	info.ProName = PRAGRAM
	// hUrl := "http://localhost:8077/api/v3/opsQueue/heartbeat"
	hUrl := viper.GetString("backend.apiHost") + "/api/v3/opsQueue/open_queue"
	t, _ := jsoniter.Marshal(info)
	req, err := http.NewRequest("POST", hUrl, bytes.NewBuffer(t))
	if err != nil {
		return err
	}
	req.Header.Set("Content-Type", "application/json")
	reqCtx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
	defer cancel()
	req.WithContext(reqCtx)
	resp, err := http.DefaultClient.Do(req)
	if err != nil {
		return err
	}
	defer resp.Body.Close()
	return nil
}
